iT邦幫忙

2022 iThome 鐵人賽

DAY 19
0
自我挑戰組

Spring In Action系列 第 19

Reactive programming & Reactive stream specification

  • 分享至 

  • xImage
  •  

這章節會介紹reactive programming, 有別於imperative programming會循序執行,reactive programming會先建立stream,程式內部會以非同步的方式執行使資料流經這些stream來達到想要的處理效果。

imperative programming:

String name = "Bond";
String upperCaseName = name.toUpperCase();
String sayHello = "Hello, " + upperCaseName + "!";
System.out.println(sayHello);

reactive programming:

Mono.just("Bond")
    .map(n -> n.toUpperCase())
    .map(cn -> "Hello, " + cn + "!")
    .subscribe(System.out::println);

Reactive programming會使用到的stream基本上會follow Reactive Stream specification,而這個規格的實作在Spring WebFlux中會使用Project Reactor,這個Reactor library可說是在Spring中使用reactive programming的一切基底。

Reactive Stream specification主要由四個介面來構成:

1.Publisher

public interface Publisher<T> {
  void subscribe(Subscriber<? super T> subscriber);
}

Publisher可說是一切的起點,其代表的意義就是資料的產出者,當它被呼叫了唯一的一個方法subscribe時會需要傳入一個Subscriber,代表是誰來去接收Pulisher產生的資料。

2.Subscriber

public interface Subscriber<T> {
  void onSubscribe(Subscription sub);
  void onNext(T item);
  void onError(Throwable ex);
  void onComplete();
}

當Subscriber被傳入Publisher的subscribe方法後,實作內容就會拿著這個subscriber來定義4種subscriber在接收資料後的動作。第一步就是onSubscribe,它代表當subscriber在初出被publisher subscribe後該做什麼,這時會傳入一個Subscription。而onNext, onError, onComplete就是在subscribe後實際處理資料時會發生的狀態。

3.Subscription

public interface Subscription {
  void request(long n);
  void cancel();
}

在subscriber被publisher subscribe後,通常就是在publisher內實作subscriber的onSubscribe;而在Subscription中的request代表一次要拿多少資料,以long n來做一個門檻值的管控,假若有任何不能處理的條件達成後,就會執行cancel方法。而在request的實作中會去定義subscriber的onNext, onComplete, onError。

4.Processor

public interface Processor<T, R>
         extends Subscriber<T>, Publisher<R> {}

一個更有彈性的介面。

以下引用了Java 9的Flow API介紹中舉的例子([https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html]),來有一個具體的理解:

class OneShotPublisher implements Publisher<Boolean> {
   private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
   private boolean subscribed; // true after first subscribe
   public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
     if (subscribed)
       subscriber.onError(new IllegalStateException()); // only one allowed
     else {
       subscribed = true;
       subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
     }
   }
   static class OneShotSubscription implements Subscription {
     private final Subscriber<? super Boolean> subscriber;
     private final ExecutorService executor;
     private Future<?> future; // to allow cancellation
     private boolean completed;
     OneShotSubscription(Subscriber<? super Boolean> subscriber,
                         ExecutorService executor) {
       this.subscriber = subscriber;
       this.executor = executor;
     }
     public synchronized void request(long n) {
       if (n != 0 && !completed) {
         completed = true;
         if (n < 0) {
           IllegalArgumentException ex = new IllegalArgumentException();
           executor.execute(() -> subscriber.onError(ex));
         } else {
           future = executor.submit(() -> {
             subscriber.onNext(Boolean.TRUE);
             subscriber.onComplete();
           });
         }
       }
     }
     public synchronized void cancel() {
       completed = true;
       if (future != null) future.cancel(false);
     }
   }
 }

上一篇
Create an email integration flow
下一篇
Mono / Flux operation
系列文
Spring In Action30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言